package com.qsl.config;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;


/**
 * 第一个Kafka配置
 * @author: 青石路
 */
@Configuration
@ConditionalOnProperty(name = "spring.kafka.first.enabled", havingValue = "true")
public class FirstKafkaConfig {

    @ConfigurationProperties(prefix = "spring.kafka.first")
    @Bean("firstKafkaProperties")
    @Primary
    public KafkaProperties firstKafkaProperties() {
        return new KafkaProperties();
    }

    @Bean("firstKafkaTemplate")
    public KafkaTemplate<String, String> firstKafkaTemplate() {
        return new KafkaTemplate<>(firstProducerFactory());
    }

    @Bean("firstKafkaListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> fisrtKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(firstConsumerFactory());
        factory.getContainerProperties().setAckMode(firstKafkaProperties().getListener().getAckMode());
        return factory;
    }

    @Bean("firstConsumerFactory")
    public ConsumerFactory<String, String> firstConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(firstKafkaProperties().buildConsumerProperties());
    }

    @Bean("firstProducerFactory")
    public DefaultKafkaProducerFactory<String, String> firstProducerFactory() {
        return new DefaultKafkaProducerFactory<>(firstKafkaProperties().buildProducerProperties());
    }
}